-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Adding asynchronous fetching for DirectIO directory #134803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Hi @benwtrent, I've created a changelog YAML for you. |
|
||
import static java.nio.ByteOrder.LITTLE_ENDIAN; | ||
|
||
public class AsyncDirectIOIndexInput extends IndexInput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change.
Hey reviewers, I am marking this "ready to review", but obviously, I think pieces of it need to be split out.
Basically, the main focus of the review should be the AsyncDirectIOIndexInput. The rest is structure I needed to actually put it through its paces and will be moved out when I can into a separate PR. |
Pinging @elastic/es-search-relevance (Team:Search Relevance) |
Pinging @elastic/es-search-foundations (Team:Search Foundations) |
…rch into exp/async-direct-io
server/src/main/java/org/elasticsearch/index/store/AsyncDirectIOIndexInput.java
Outdated
Show resolved
Hide resolved
…rch into exp/async-direct-io
server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java
Outdated
Show resolved
Hide resolved
public IndexInput openInput(String name, IOContext context) throws IOException { | ||
int blockSize = getBlockSize(path); | ||
ensureOpen(); | ||
if (useDirectIO(name, context, OptionalLong.of(fileLength(name)))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, this will always be true
here.
(could we just use the implementation in FsDirectoryFactory
somehow?)
|
||
// Reading immediately after seeking past EOF should throw EOFException | ||
expectThrows(EOFException.class, () -> i.readByte()); | ||
i.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Put IndexInput i
in a try block?
} | ||
|
||
// Ping-pong seeks should be really fast, since the position should be within buffer. | ||
// The test should complete within sub-second times, not minutes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't check the time. Is it worth adding a stopwatch check for a long time, say 1 minute?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thecoop honestly, I am not sure...that would likely make it flaky, I am inheriting these tests from Lucene.
int offset = 84; | ||
float[] vectorActual = new float[768]; | ||
int[] toSeek = new int[] { 1, 2, 3, 5, 6, 9, 11, 14, 15, 16, 18, 23, 24, 25, 26, 29, 30, 31 }; | ||
int byteSize = 768 * 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int byteSize = 768 * 4; | |
int byteSize = vectorActual.length * Float.BYTES; |
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class AsyncDirectIOIndexInputTests extends ESTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's quite a few magic numbers in this class. Could they be consolidated and some comments added?
buffer.flip(); | ||
buffer.position(delta); | ||
} catch (IOException ioe) { | ||
throw new IOException(ioe.getMessage() + ": " + this, ioe); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to rethrow as an IOException
? Note that this hides any thrown subclasses of IOException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, this is copied from Lucene. I would hope once Lucene gets async directIO, we can remove this class and rely on Lucene's.
*/ | ||
void prefetch(long pos, long length) { | ||
// first determine how many slots we need given the length | ||
int numSlots = (int) Math.min((length + prefetchBytesSize - 1) / prefetchBytesSize, Integer.MAX_VALUE - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want a max number of slots as Integer.MAX_VALUE - 1
? Wouldn't that cause significant problems having that number?
void prefetch(long pos, long length) { | ||
// first determine how many slots we need given the length | ||
int numSlots = (int) Math.min((length + prefetchBytesSize - 1) / prefetchBytesSize, Integer.MAX_VALUE - 1); | ||
while (numSlots > 0 && (this.posToSlot.size() + this.pendingPrefetches.size()) < maxTotalPrefetches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this doesn't do any prefetching if we've got max in-progress then. That's probably the right thing to do, but it may be worth making a note that in high-pressure situations (IO taking a long time, exceptions thrown, large blocking prefetches, whatever), prefetching wont be doing anything. Is it worth a debug log in this case to help us diagnose IO problems around this in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thecoop let me add a logger! yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with the overall structure of this. I've commented on a few details that could be worked on here, or separately later on
}; // can we set on both - node and index level, some nodes might be running on NFS so they might need simple rather than native | ||
}, Property.IndexScope, Property.NodeScope); | ||
|
||
public static final Setting<Integer> ASYNC_PREFETCH_LIMIT = Setting.intSetting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@thecoop I made this change as I figured we want a way to shut it off or increase the limit. 64 is pretty conservative with only 8k buffers, but it seems better to be safer than not.
I am also not sure we actually want to document this setting. Ideally, its never touched.
One significant cost of DirectIO is simply waiting for bytes to be read in a path dedicate for compute.
This change adds "prefetch" capabilities to DirectIO by allowing to prefetch particular file positions. For simplicity, I have it always prefetch a DirectIO page. Initially I did a bunch of work to allow prefetching multiple pages (e.g. more than 8192 bytes), but this greatly complicated the implementation. I think this can be added as a follow up.
Here are some benchmarks for vectors. Note, the recall difference indicates I am doing something wrong right now. I am thinking I have a couple off-by-one errors and I am still investigating.
Opening as a draft until I can figure out this weird bug (and of course, remove all my extraneous changes used for testing this thing)...This is labeled as 9.2, but I would be very surprise if it actually lands there.
This PR:
Baseline DirectIO:
Baseline MMAP (when many floating points can still just reside in memory):